#include "config.h"

#define DBG_SUBSYS S_LIBTASK

#include "lich_api.h"
#include "yid.h"

static const char *__op2str(trans_op_t op)
{
        switch (op) {
                case TRANS_CREATE_SNAPSHOT:
                        return "trans_create_snapshot";
                        break;
                default:
                        YASSERT(0);
        }

        return "unknown";
}

static inline int volume_key(char *key, const chkid_t *chkid) {
        sprintf(key, "volume/createsnap/%s", id2str(chkid));
        return 0;
}

/**
 * case 1: 事务不存在，则创建之
 * case 2: 事务已存在，退出 (在load时REDO事务）
 *
 * 为了避免多事务的复杂性，每个卷只允许同时存在一个事务
 */
int trans_begin(trans_context_t **ctx, const void *new_trans, size_t len)
{
        int ret, valuelen;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        trans_context_t *_ctx;
        const trans_base_t *base = new_trans;

        *ctx = NULL;

        ret = ymalloc((void **)&_ctx, sizeof(trans_context_t) + len);
        if (unlikely(ret))
                GOTO(err_free1, ret);

        _ctx->base = *base;
        _ctx->is_new = 0;

        volume_key(_ctx->key, &base->chkid);

        valuelen = PAGE_SIZE;
        ret = etcd_get_bin(ETCD_TRANS, _ctx->key, buf, &valuelen, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                        ret = etcd_create(ETCD_TRANS, _ctx->key, new_trans, len, 0);
                        if (unlikely(ret))
                                GOTO(err_free2, ret);

                        memcpy(_ctx->buf, new_trans, len);
                        _ctx->is_new = 1;
                } else {
                        GOTO(err_free2, ret);
                }
        } else {
                YASSERT(valuelen == len);

                memcpy(_ctx->buf, buf, len);
                _ctx->is_new = 0;
        }

        DINFO("key %s\n", _ctx->key);

        *ctx = _ctx;

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_free2:
        yfree((void **)&_ctx);
err_free1:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int trans_commit(trans_context_t *ctx)
{
        int ret;

        DINFO("key %s\n", ctx->key);

        ret = etcd_del(ETCD_TRANS, ctx->key);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int trans_abort(IN const char *key)
{
        int ret;

        DINFO("key %s\n", key);

        ret = etcd_del(ETCD_TRANS, key);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __do_create_snapshot__(volume_proto_t *volume_proto, trans_create_snapshot_t *trans) {
        int ret;

        YASSERT(volume_proto->chkid.type == __VOLUME_CHUNK__);

        DWARN("op %d chkid %s snap %s p %d site %s snap_from %jd snap_version %jd\n",
              trans->base.op,
              id2str(&trans->base.chkid),
              trans->snap,
              trans->priority,
              trans->site,
              trans->snap_from,
              trans->snap_version);

        // TODO fix

        ret = volume_proto->snapshot_create(volume_proto, trans->snap, trans->priority, trans->site, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * 完成创建快照的事务。如果是EDQUOT错误，则销毁事务，并正常退出。
 *
 * @Param volume_proto
 * @Param trans
 *
 * @Returns
 */
static int __do_create_snapshot(IN volume_proto_t *volume_proto, IN trans_create_snapshot_t *trans)
{
        int ret;
        char key[MAX_NAME_LEN];

        ret = __do_create_snapshot__(volume_proto, trans);
        if (unlikely(ret)) {
                if (ret == EDQUOT) {
                        DWARN("redo create "CHKID_FORMAT" snapshot fail, destroy %s\n",
                              CHKID_ARG(&volume_proto->chkid), key);
                        volume_key(key, &volume_proto->chkid);
                        ret = trans_abort(key);
                        if (ret)
                                GOTO(err_ret, ret);
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

/**
 * @todo 限制REDO次数，在某些极端情况下，REDO一直无法成功，导致卷加载不了
 */
int trans_redo(void *_volume_proto) {
        int ret, valuelen, idx;
        char key[MAX_NAME_LEN];
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        volume_proto_t *volume_proto = _volume_proto;

        volume_key(key, &volume_proto->chkid);

        // TODO scan multi transactions
        valuelen = PAGE_SIZE;
        ret = etcd_get_bin(ETCD_TRANS, key, buf, &valuelen, &idx);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        goto out;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        trans_base_t *base = (void *)buf;

        DINFO("redo %d th %s\n", base->redo, __op2str(base->op));

        /*判断redo次数*/
        if (base->redo < TRANS_REDO_MAX) {
                base->redo++;
                ret = etcd_update(ETCD_TRANS, key, buf, valuelen, &idx, 0);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                DWARN("redo too many, destroy op %s, chkid: "CHKID_FORMAT"\n",
                      __op2str(base->op),
                      CHKID_ARG(&volume_proto->chkid));
                trans_abort(key);
                goto out;
        }

        if (base->op == TRANS_CREATE_SNAPSHOT) {
                trans_create_snapshot_t *trans = (void *)buf;
                ret = __do_create_snapshot(volume_proto, trans);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                YASSERT(0);
        }

out:
        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}
